package com.atguigu.flink.sql.connector;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by 黄凯 on 2023/6/25 0025 18:22
 *
 * @author 黄凯
 * 永远相信美好的事情总会发生.
 *
 * FileConnector :  基于文件读写。
 */
public class Flink02_FileConnector {

    public static void main(String[] args) {

        //1. 创建表环境
        TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

        //2. File Connector read
        String readSql=
                " create table t1 (" +
                        " id STRING , " +
                        " vc INT , " +
                        " ts BIGINT, " +
                        //" `file.path` STRING NOT NULL METADATA ," +
                        //" `file.name` STRING NOT NULL METADATA " +
                        " `file.size` BIGINT NOT NULL METADATA " +
                        ") WITH ( " +
                        " 'connector' = 'filesystem' , " +
                        " 'path' = 'input/ws.txt', " +
                        " 'format' = 'csv' " +
                        ")" ;

        tableEnv.executeSql(readSql);

        Table table = tableEnv.sqlQuery("select id ,vc ,ts  , `file.size` as fs from t1  where vc > 200");

        tableEnv.createTemporaryView("t2",table);

        //File Connector write
        String writeSql =
                " create table t3 (" +
                        " id STRING , " +
                        " vc INT , " +
                        " ts BIGINT, " +
                        " fs BIGINT " +
                        ") WITH ( " +
                        " 'connector' = 'filesystem' , " +
                        " 'path' = 'output' ," +
                        " 'format' = 'json' " +
                        ")" ;

        tableEnv.executeSql(writeSql);

        //从t2查询数据，写入到t3表
        tableEnv.executeSql("insert into t3 select * from t2");

    }

}
